-
Notifications
You must be signed in to change notification settings - Fork 45
refactor: make BGP peer and secret management system event-driven #790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: nsk-split-svc
Are you sure you want to change the base?
Conversation
6fee584 to
2c22c24
Compare
ffb8c29 to
eb5adeb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for taking a stab at this !
A couple comments inline, I think there is still room for more untangling of the logic.
Tell me if anything sounds unclear
| w := &PeerWatcher{ | ||
| log: log, | ||
| clientv3: clientv3, | ||
| handler: handler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing a handler here, could we emit events ?
i.e.
felixServer.GetFelixServerEventChan() <- &common.ProcessPeers{peers.Items}
and in the felixServer main loop explicitly calling handlers
func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
...
case *common.ProcessPeers:
s.peerHandler.ProcessPeers(evt)
| } | ||
| return nil | ||
| // Send the peer list to the handler for processing | ||
| return w.handler.ProcessPeers(peers.Items) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would then be an event
| // OnSecretChanged handles secret change events and triggers peer reconciliation | ||
| func (w *PeerWatcher) OnSecretChanged(secretName string) { | ||
| w.log.Infof("Secret '%s' changed, notifying handler", secretName) | ||
| w.handler.OnSecretChanged(secretName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing here, we could make OnSecretChanged be an event,
that way we wouldn't have to keep a reference to the handler here
| w.log.Infof("Secret '%s' changed, notifying handler", secretName) | ||
| w.handler.OnSecretChanged(secretName) | ||
| // Trigger a resync to apply the new secret | ||
| err := w.resyncPeers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this shouldn't be needed anymore :
Before it was more convenient to call resyncPeers() as this was doing both scanning, and constituting the local cache of Peers+Secrets.
But now that the reconstitution is done in the handlers, we can do everything there, and maybe call the same code in both cases (peerChanged & secretChanged)
| func (sw *secretWatcher) OnAdd(obj interface{}, isInInitialList bool) { | ||
| func (sw *SecretWatcher) notifySecretChanged(secretName string) { | ||
| for _, handler := range sw.handlers { | ||
| handler.OnSecretChanged(secretName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we rely on the fact that the peerWatcher does not need to be informed when the secret changes, and we make a common.OnSecretChanged towards the felix server, we could remove the need to have keep a sw.handler reference.
We could simply
eventChan <- &common.OnSecretChanged{...}|
|
||
| nodeStatesByName map[string]common.LocalNodeSpec | ||
|
|
||
| secretWatcher watchers.SecretGetter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably remove the reference to the secretWatcher by adding a secret store in the peerHandler. We could create events emitted by the secretWatcher
common.SecretAdded{...}
common.SecretDelete{...}
that would be received in the felixServer event loop,
call felix.peerHandler.OnSecret{Added,Deleted}(evt)
which would update a local cache map.
Below h.secretWatcher.GetSecret(secretName, secretKey) just becomes a lookup in the map in question
| log: log, | ||
| clientv3: clientv3, | ||
| handler: handler, | ||
| secretWatcher: secretWatcher, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for simplicity, if we remove the need for handler's registration by introducing events
we could instantiate the secretWatcher directly here
i.e.
secretWatcher: watchers.NewSecretWatcher(k8sclient, log.WithFields(logrus.Fields{"component": "secret-watcher"}))
| activeSecrets := make(map[string]struct{}) | ||
| for _, peer := range peers.Items { | ||
| if peer.Spec.Password != nil && peer.Spec.Password.SecretKeyRef != nil { | ||
| secretName := peer.Spec.Password.SecretKeyRef.Name | ||
| activeSecrets[secretName] = struct{}{} | ||
| } | ||
| } | ||
| return true | ||
| } | ||
| w.secretWatcher.SweepStale(activeSecrets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this w.secretWatcher.OnPeerListUpdated(peers.Items) and have the sweep logic in the secret watcher ?
| } | ||
|
|
||
| func (sw *secretWatcher) GetSecret(name, key string) (string, error) { | ||
| func (sw *SecretWatcher) GetSecret(name, key string) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following on the comment above, we could directly start watching for secrets when OnPeerListUpdated() is called with a new Peer that has secrets attached.
And emit events when we receive the secret's contents.
| w.log.Warn("Unrecognized secret change event received. Ignoring...") | ||
| } | ||
| // Re-sync peers when watch events occur | ||
| err = w.resyncPeers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have an isolated watcher, we could implement the watch logic :
- we would cache the list of peers locally on the peerWatcher
- when update/delete/create happens, we update the list, and send a copy as an event over the eventChan
We only resort to listing again if an error happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further simplification would be to send separated events for ADD/DEL/UPD over the eventchan, but this will require changing the handler's logic which might be beyond the scope of this PR
Signed-off-by: Aritra Basu <[email protected]>
eb5adeb to
d64a12d
Compare
Summary
This PR refactors the BGP peer and secret management system from a tightly-coupled handler-based architecture to a loosely-coupled event-driven system.
Changes by Component
1. PeerWatcher (
watchers/peers_watcher.go)cachedPeers map[string]calicov3.BGPPeerPeerAdded- single new peer addedPeerUpdated- single existing peer updatedPeerDeleted- single existing peer deletedPeersChanged- for initial state reconciliation only2. SecretWatcher (
watchers/secret_watcher.go)GetSecret()method (blocking) with eventsSweepStale()method (replaced withOnPeerListUpdated)SecretAdded- new secret availableSecretChanged- existing secret modifiedSecretDeleted- existing secret removedOnPeerListUpdated()- starts watches when peers are added3. PeerHandler (
routing/peer_handler.go)secretCache map[string]map[string]stringOnPeerAdded(peer)- O(1) processing for single peerOnPeerUpdated(old, new)- O(1) diff and updateOnPeerDeleted(peer)- O(1) removalOnSecretAdded(secretData)- cache secret locallyOnSecretChanged(secretName)- mark affected peers for updateOnSecretDeleted(secretName)- remove from cacheProcessPeers()for initial reconciliation only4. FelixServer (
felix/felix_server.go)PeersChanged,PeerAdded,PeerUpdated,PeerDeleted,SecretAdded,SecretChanged,SecretDeletedMade the changes on top of the nsk-split-svc branch for now, will rebase on top of
masteronce those commits forsingle thread agentare merged.